package org.codehaus.activemq.transport.vm;

import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.transport.AbstractTransportChannel;
import org.codehaus.activemq.transport.TransportChannel;

public class VmTransportChannel extends AbstractTransportChannel
  implements Runnable
{
  private static final Log log = LogFactory.getLog(VmTransportChannel.class);

  private static final Object TERMINATE = new Object();
  private Channel sendChannel;
  private Channel receiveChannel;
  private SynchronizedBoolean closed;
  private SynchronizedBoolean started;
  private Thread thread;
  private static int lastThreadId = 0;

  public VmTransportChannel(Channel sendChannel, Channel receiveChannel) {
    this.sendChannel = sendChannel;
    this.receiveChannel = receiveChannel;
    this.closed = new SynchronizedBoolean(false);
    this.started = new SynchronizedBoolean(false);
  }

  public VmTransportChannel() {
    this(1000);
  }

  public VmTransportChannel(int capacity) {
    this(new BoundedLinkedQueue(capacity), new BoundedLinkedQueue(capacity));
  }

  public void stop()
  {
    if (this.closed.commit(false, true)) {
      super.stop();
      try
      {
        this.sendChannel.put(TERMINATE);

        this.thread.join();
      }
      catch (Exception e) {
        log.trace(toString() + " now closed with exception: " + e);
      }
    }
  }

  public void start()
    throws JMSException
  {
    if (this.started.commit(false, true)) {
      this.thread = new Thread(this, "VM Transport: " + getNextThreadId());
      this.thread.setDaemon(true);
      this.thread.start();
    }
  }

  public static synchronized int getNextThreadId() {
    return lastThreadId++;
  }

  public void asyncSend(Packet packet)
    throws JMSException
  {
    while (true)
      try
      {
        this.sendChannel.put(packet);
      }
      catch (InterruptedException e)
      {
      }
  }

  public boolean isMulticast()
  {
    return false;
  }

  public void run()
  {
    while (!this.closed.get())
      try {
        Object answer = this.receiveChannel.take();
        if (answer == TERMINATE) {
          log.trace("The socket peer is now closed");
          stop();
          return;
        }
        if (answer != null) {
          Packet packet = (Packet)answer;

          if (this.closed.get()) {
            break;
          }
          doConsumePacket(packet);
        }
      }
      catch (InterruptedException e)
      {
      }
  }

  private void doClose(Exception ex)
  {
    if (!this.closed.get()) {
      JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
      jmsEx.setLinkedException(ex);
      onAsyncException(jmsEx);
      stop();
    }
  }

  public String toString()
  {
    return "VmTransportChannel: " + this.sendChannel;
  }

  public TransportChannel createServerSide()
    throws JMSException
  {
    return new VmTransportChannel(this.receiveChannel, this.sendChannel);
  }
}